package com.poetic.market.analysis;

import com.poetic.market.analysis.domain.MarketingUserBehavior;
import com.poetic.market.analysis.process.MarketingCountTotal;
import com.poetic.market.analysis.source.SimulatedEventSource;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;

import java.time.Duration;

/**
 * <pre>
 *  不分渠道（总量）统计
 * Created by lianghuikun on 2020-09-16.
 * </pre>
 *
 * @author lianghuikun
 */
public class AppMarketingStatisticsTask {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);

        SingleOutputStreamOperator<MarketingUserBehavior> stream = env.addSource(new SimulatedEventSource())
                .assignTimestampsAndWatermarks(WatermarkStrategy.<MarketingUserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(1))
                        .withTimestampAssigner(new SerializableTimestampAssigner<MarketingUserBehavior>() {
                            @Override
                            public long extractTimestamp(MarketingUserBehavior element, long recordTimestamp) {
                                return element.getTimestamp();
                            }
                        }));
        stream.filter(new FilterFunction<MarketingUserBehavior>() {
            @Override
            public boolean filter(MarketingUserBehavior value) throws Exception {
                return !value.getBehavior().equals("UNINSTALL");
            }
        })
                .map(new MapFunction<MarketingUserBehavior, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(MarketingUserBehavior value) throws Exception {
                        return Tuple2.of("dummyKey", 1L);
                    }
                }).keyBy(new KeySelector<Tuple2<String, Long>, String>() {
            @Override
            public String getKey(Tuple2<String, Long> value) throws Exception {
                return value.f0;
            }

        })
                .timeWindow(Time.hours(1), Time.seconds(1))
                .process(new MarketingCountTotal())
                .print();

        env.execute("app marketing statistics");
    }
}
